import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.text.SimpleDateFormat;
import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.*;

public class DistinctAggregation4 {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
                new OrderStream(1, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(2, 1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(3, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
                new OrderStream(4, 3L, "rubber", 2, 1505527800L),//2017-09-16 10:10:00
                new OrderStream(5, 1L, "diaper", 4, 1505528400L),//2017-09-16 10:20:00
                new OrderStream(6, 1L, "diaper", 4, 1505528400L)//2017-09-16 10:20:00
        ));




        Table orders=tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<OrderStream>()
        {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            //            @Nullable
            @Override
            public org.apache.flink.streaming.api.watermark.Watermark getCurrentWatermark()
            {

                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }


            @Override
            public long extractTimestamp(OrderStream element, long recordTimestamp)
            {
                long timestamp = element.rowtime;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

                System.out.println("id:"+element.id+","+"user:"+element.user+"," +
                        "eventtime:["+element.rowtime+"|"+sdf.format(element.rowtime)+"" +
                        "],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+
                        getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        }),$("user"), $("product"), $("amount"), $("rowtime").rowtime());



// Use distinct aggregation for user-defined aggregate functions
        tEnv.registerFunction("myUdagg", new MyUdagg());
        Table result=orders.groupBy("user")
                .select(
                        $("user"),
                        call("myUdagg", $("amount")).distinct().as("myDistinctResult")
                );

//        Table result=orders.groupBy("user")
//                .select(
//                        $("user")
//                );



        tEnv.toRetractStream(result, Row.class).print();
        env.execute();
    }

}
